package cn.doitedu.tech.validate;

import lombok.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.roaringbitmap.RoaringBitmap;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.ObjectInputStream;
import java.util.HashMap;

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // 读取用户行为事件
        // 1,e01,1000
        DataStreamSource<String> events = env.socketTextStream("doitedu", 9099);

        SingleOutputStreamOperator<EventBean> beans = events.map(s -> {
            String[] arr = s.split(",");
            return new EventBean(Integer.parseInt(arr[0]), arr[1], Long.parseLong(arr[2]));
        });


        //
        /*tEnv.executeSql("CREATE TABLE rule_resource (    " +
                "      rule_id INT,                                 " +
                "      rule_param STRING,                           " +
                "      target_users binary,                         " +
                "      tag_init_values binary,                      " +
                "      groovy_code STRING ,                         " +
                "     PRIMARY KEY (rule_id) NOT ENFORCED            " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'rtmk',                " +
                "     'table-name' = 'rule_resource'           " +
                ")");
        Table table = tEnv.from("rule_resource");
        DataStream<Row> rowDataStream = tEnv.toChangelogStream(table);
        SingleOutputStreamOperator<RuleResource> ruleResourceDataStream = rowDataStream.map(new MapFunction<Row, RuleResource>() {
            @Override
            public RuleResource map(Row row) throws Exception {
                Integer rule_id = row.<Integer>getFieldAs("rule_id");
                String rule_param = row.<String>getFieldAs("rule_param");
                byte[] target_users = row.<byte[]>getFieldAs("target_users");
                byte[] tag_init_values = row.<byte[]>getFieldAs("tag_init_values");
                String groovy_code = row.<String>getFieldAs("groovy_code");

                return new RuleResource(rule_id, rule_param, target_users, tag_init_values, groovy_code);
            }
        });


        SingleOutputStreamOperator<RuleResource2> ruleResource2 = ruleResourceDataStream.map(new MapFunction<RuleResource, RuleResource2>() {
            @Override
            public RuleResource2 map(RuleResource value) throws Exception {

                RuleResource2 ruleResource2 = new RuleResource2();
                ruleResource2.setRule_id(value.rule_id);
                ruleResource2.setRule_param(value.getRule_param());
                ruleResource2.setGroovy_code(value.getGroovy_code());

                // bitmap反序列化
                byte[] target_users_bytes = value.getTarget_users();
                RoaringBitmap bm = RoaringBitmap.bitmapOf();

                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(target_users_bytes);
                DataInputStream dataInput = new DataInputStream(byteArrayInputStream);

                bm.deserialize(dataInput);

                ruleResource2.setTargetUserBm(bm);

                // 反序列化历史初始值
                byte[] tag_init_values = value.getTag_init_values();
                ByteArrayInputStream byteArrayInputStream2 = new ByteArrayInputStream(tag_init_values);
                ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream2);
                HashMap<Integer, Integer> initValues = (HashMap<Integer, Integer>) objectInputStream.readObject();

                ruleResource2.setTag_init_values(initValues);


                return ruleResource2;
            }
        });*/


        beans.keyBy(EventBean::getUserId)
                .process(new KeyedProcessFunction<Integer, EventBean, String>() {
                    Rule01Caculator rule01Caculator;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        rule01Caculator = new Rule01Caculator();
                        HashMap<Integer, Integer> initValues = new HashMap<>();
                        initValues.put(1,1);
                        initValues.put(5,2);

                        MapState<Integer, Integer> mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("rule01_tag01", Integer.class, Integer.class));

                        rule01Caculator.open(mapState,initValues);
                    }

                    @Override
                    public void processElement(EventBean eventBean, KeyedProcessFunction<Integer, EventBean, String>.Context ctx, Collector<String> out) throws Exception {
                        rule01Caculator.tagCaculate(eventBean);

                        out.collect(rule01Caculator.tagQuery());
                    }
                }).print();


        env.execute();


    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RuleResource {
        int rule_id;
        String rule_param;
        byte[] target_users;
        byte[] tag_init_values;
        String groovy_code;
    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RuleResource2 {
        int rule_id;
        String rule_param;
        RoaringBitmap targetUserBm;
        HashMap<Integer, Integer> tag_init_values;
        String groovy_code;

    }


}
